package com.iteaj.iot.test.mqtt;

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.iteaj.iot.IotThreadManager;
import com.iteaj.iot.client.ClientProtocolHandle;
import com.iteaj.iot.client.mqtt.MqttClient;
import com.iteaj.iot.client.mqtt.MqttDecoderInterceptor;
import com.iteaj.iot.client.mqtt.impl.*;
import com.iteaj.iot.codec.filter.DecoderInterceptor;
import com.iteaj.iot.test.IotTestHandle;
import com.iteaj.iot.test.IotTestProperties;
import com.iteaj.iot.test.TestConst;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.timeout.IdleState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
 * create time: 2021/9/3
 *
 * @author iteaj
 * @since 1.0
 */
public class MqttClientTestHandle implements ClientProtocolHandle<MqttPublishTestProtocol>, IotTestHandle {

    @Autowired
    private IotTestProperties properties;
    @Autowired(required = false)
    private DefaultMqttComponent defaultMqttComponent;
    private Logger logger = LoggerFactory.getLogger(getClass());

    public static final String TOPIC_RESPONSE = "iteaj/test/cus/response";
    public static final String AT_MOST_ONCE_TOPIC = "iteaj/test/iot/atMostOnce/0";
    public static final String EXACTLY_ONCE_TOPIC = "iteaj/test/iot/exactlyOnce/2";
    public static final String AT_LEAST_ONCE_TOPIC = "iteaj/test/iot/atLeastOnce/1";

    @Override
    public Object handle(MqttPublishTestProtocol protocol) {
        return null;
    }

    @Override
    public void start() throws Exception{
        IotTestProperties.TestMqttConnectProperties config = properties.getMqtt();

        long ReaderIdleTime = 36;
        if(defaultMqttComponent != null) {
            defaultMqttComponent.setInterceptor(new MqttDecoderInterceptor() {

                @Override
                public Object idle(String deviceSn, IdleState state) {
                    logger.info(TestConst.LOGGER_PROTOCOL_FUNC_DESC, defaultMqttComponent.getName()
                            , "MqttDecoderInterceptor.idle()", deviceSn, "通过");
                    return MqttDecoderInterceptor.super.idle(deviceSn, state);
                }
            });

            final ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
            String subscriptionClientId = "Iot:Client:Listener";
            defaultMqttComponent.createNewClientAndConnect(new DefaultMqttConnectProperties(config.getHost()
                    , config.getPort(), subscriptionClientId, protocol -> {

                DefaultMqttConnectProperties properties = protocol.requestMessage().getProperties();
                byte[] message = protocol.requestMessage().getMessage();
                String value = new String(message);
                if(value.equals("iteaj/subscription/create")) {
                    logger.info(TestConst.LOGGER_PROTOCOL_FUNC_DESC, defaultMqttComponent.getName()
                            , "createNewClientAndConnect(DefaultMqttConnectProperties)", properties.getClientId(), "通过" );
                    defaultMqttComponent.unsubscribe(subscriptionClientId, "iteaj/subscription/create/#").addListener(future -> {
                        if(future.isSuccess()) {
                            TimeUnit.SECONDS.sleep(1); // 等待取消订阅完成
                            new DefaultMqttPublishProtocol("iteaj/unsubscription/create".getBytes(StandardCharsets.UTF_8), "iteaj/subscription/create/68").request();
                        }
                    });
                } else if(value.equals("iteaj/subscription/newAdd")) {
                    logger.info(TestConst.LOGGER_PROTOCOL_FUNC_DESC, defaultMqttComponent.getName()
                            , "subscribe", properties.getClientId(), "通过" );
                    defaultMqttComponent.unsubscribe(subscriptionClientId, "iteaj/subscription/newAdd/#").addListener(future -> {
                        if(future.isSuccess()) {
                            TimeUnit.SECONDS.sleep(1); // 等待取消订阅完成
                            new DefaultMqttPublishProtocol("iteaj/unsubscription/newAdd".getBytes(StandardCharsets.UTF_8), "iteaj/subscription/newAdd/68").request();

                            // 如果10s内没有取消掉说明取消订阅成功
                            schedule[0] = IotThreadManager.instance().getExecutorService().schedule(() -> {
                                logger.info(TestConst.LOGGER_PROTOCOL_FUNC_DESC, defaultMqttComponent.getName()
                                        , "unsubscription", properties.getClientId(), "通过");
                            }, 10, TimeUnit.SECONDS);
                        }
                    });
                } else {
                    schedule[0].cancel(true);
                    logger.error(TestConst.LOGGER_PROTOCOL_FUNC_DESC, defaultMqttComponent.getName(), value, properties.getClientId(), "失败" );
                }
                // 取消订阅
            }));

            IotThreadManager.instance().getExecutorService().schedule(() -> {
                // subscribe测试
                defaultMqttComponent.subscribe(subscriptionClientId, "iteaj/subscription/create/#", MqttQoS.AT_LEAST_ONCE);
                defaultMqttComponent.subscribe(subscriptionClientId, "iteaj/subscription/newAdd/#", MqttQoS.AT_LEAST_ONCE);

                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                new DefaultMqttPublishProtocol("iteaj/subscription/create".getBytes(StandardCharsets.UTF_8), "iteaj/subscription/create/68").request();
                new DefaultMqttPublishProtocol("iteaj/subscription/newAdd".getBytes(StandardCharsets.UTF_8), "iteaj/subscription/newAdd/68").request();
            }, 5, TimeUnit.SECONDS);
        }

        DefaultMqttConnectProperties willTopicConfig = new DefaultMqttConnectProperties("WillTopicByNotRetain", null);
        // 不保留遗嘱的测试客户端
        BeanUtils.copyProperties(config, willTopicConfig, "clientId");
        willTopicConfig.setWillTopic("iteaj/willTopic/iot/"+willTopicConfig.getClientId());
        willTopicConfig.setWillRetain(false); // 不保留的遗嘱
        willTopicConfig.setWillQos(MqttQoS.AT_LEAST_ONCE);
        willTopicConfig.setWillMessage("{\"retain\": false}"); // 不保留遗嘱测试
        willTopicConfig.setReaderIdleTime(ReaderIdleTime); // 做idleTime测试
        MqttClient retainWillClient = defaultMqttComponent.createNewClientAndConnect(willTopicConfig);

        DefaultMqttConnectProperties willRetainTopicConfig = new DefaultMqttConnectProperties("WillTopicByRetain", null);
        // 保留遗嘱的测试客户端
        BeanUtils.copyProperties(config, willRetainTopicConfig, "clientId");
        willRetainTopicConfig.setWillTopic("iteaj/willTopic/iot/"+willRetainTopicConfig.getClientId());
        willRetainTopicConfig.setWillRetain(true); // 保留的遗嘱
        willRetainTopicConfig.setWillQos(MqttQoS.AT_MOST_ONCE);
        willRetainTopicConfig.setWillMessage("{\"retain\": true}"); // 保留遗嘱测试
        MqttClient willClient = defaultMqttComponent.createNewClientAndConnect(willRetainTopicConfig);


        System.out.println("---------------------------------------------------- 开始mqtt测试 ----------------------------------------------------------");
        DefaultMqttConnectProperties willTopicClientId = new DefaultMqttConnectProperties("WillTopicClientId"
                , Arrays.asList(new MqttTopicSubscription("iteaj/willTopic/iot/#", MqttQoS.AT_MOST_ONCE)), protocol -> {
            byte[] message = protocol.requestMessage().getMessage();
            JSONObject jsonObject = JSONUtil.parseObj(new String(message));
            if(jsonObject.containsKey("retain")) {
                logger.info(TestConst.LOGGER_MQTT_PROTOCOL_DESC, defaultMqttComponent.getName()
                        , "WillTopic", protocol.getTopic(), protocol.getEquipCode(), "-", "通过" );
            }
        });

        BeanUtils.copyProperties(config, willTopicClientId, "clientId");
        defaultMqttComponent.createNewClientAndConnect(willTopicClientId);

        // 断线发送测试
        defaultMqttComponent.createNewClientAndConnect(new DefaultMqttConnectProperties(config.getHost()
                        , config.getPort(), "DisconnectRequest", Arrays.asList(new MqttTopicSubscription("iteaj/test/iot/#", MqttQoS.AT_MOST_ONCE))
                , protocol -> {
            DefaultMqttConnectProperties properties = protocol.requestMessage().getProperties();
            logger.info(TestConst.LOGGER_PROTOCOL_FUNC_DESC, defaultMqttComponent.getName()
                    , "disconnect()+request()", properties.getClientId(), "通过");
                })
        );
        TimeUnit.SECONDS.sleep(3);
        willClient.close(); // 断线移除测试
        retainWillClient.disconnect().syncUninterruptibly(); // 断线重连测试

        // 测试最多发送一次报文
        new DefaultMqttPublishProtocol(AT_MOST_ONCE_TOPIC.getBytes(), MqttQoS.AT_MOST_ONCE, AT_MOST_ONCE_TOPIC).request(retainWillClient.getConfig());

        TimeUnit.SECONDS.sleep(5);
    }

    @Override
    public int getOrder() {
        return 1000 * 60;
    }
}
